-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add TTFT support in OpenAI chat generator #8444
Conversation
Pull Request Test Coverage Report for Build 11614471154Details
💛 - Coveralls |
Ok @LastRemote regarding streaming response this approach worked for me. It is slightly different that yours, perhaps have a look at it and consider some of these changes. |
@vblagoje Hello, I reviewed your approach and still have some concerns regarding the meta structure and potential integration with Langfuse. Specifically, when a chunk is built, the current behavior captures fields like Additionally, I converted the TTFT timestamp to ISO format. This change is intended to make it easier to log the entire ChatMessage object or include it as part of a service response. It also requires a slightly different update in the Langfuse integration (which I already implemented in my current project): ...
if tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS:
replies = span._data.get("haystack.component.output", {}).get("replies")
if replies:
meta = replies[0].meta
completion_start_time = meta.get("completion_start_time")
if completion_start_time:
completion_start_time = datetime.fromisoformat(completion_start_time)
span._span.update(
usage=meta.get("usage") or None,
model=meta.get("model"),
completion_start_time=completion_start_time,
) |
Ok thanks @LastRemote - I'll take a look at this again soon, got sidetracked with some other higher prio tasks, this week def. |
@LastRemote I took a closer look and I noticed that you implemented this feature in OpenAIGenerator. As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏 |
Sure thing, we can focus on |
Awesome 💪 |
So I extend the abstract This is the actual code I have for @component
class ChatGenerator:
"""
A class used to generate chat responses from a given chat model service.
:param model_service: An instance of the ChatModelService to interact with the model.
:param model: The name of the model to use for generating responses, if provided.
:param stream: Whether to use streaming when calling the chat model service, defaults to False
:param streaming_callbacks: A list of callback functions to handle streaming responses, defaults to None
:param timeout: The timeout duration for the HTTP client, defaults to 20.0
:param generation_kwargs: Additional keyword arguments to pass to the model for generation, defaults to None
"""
def __init__(
self,
model_service: ChatModelService,
model: Optional[str] = None,
stream: bool = False,
streaming_callbacks: Optional[List[Callable[[StreamingChunk], None]]] = None,
timeout: Optional[float] = 20.0,
generation_kwargs: Optional[Dict[str, Any]] = None,
):
self.model_service = model_service
self.model = model or ""
self.stream = stream
self.streaming_callbacks = streaming_callbacks
self.generation_kwargs = generation_kwargs or {}
self.timeout = timeout
def to_dict(self) -> Dict[str, Any]:
"""
Returns a serialized dictionary representation of the component.
:return: A dictionary representation of the component.
"""
# Create a local asset reference for the model service
model_service_reference = self.model_service.create_local_reference()
# Serialize the streaming callbacks
callback_names = (
[serialize_callable(callback) for callback in self.streaming_callbacks]
if self.streaming_callbacks
else None
)
return default_to_dict(
self,
model_service=model_service_reference.to_dict(),
model=self.model,
streaming_callbacks=callback_names,
timeout=self.timeout,
generation_kwargs=self.generation_kwargs,
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Self:
"""
Loads a ChatGenerator component from its serialized dictionary representation.
:param data: The serialized dictionary representation of the component.
:return: The deserialized ChatGenerator component.
"""
# Make a shallow copy of init_parameters to avoid modifying the original data
init_parameters_data = data.get("init_parameters", {}).copy()
# Load the chat model service from reference
model_service_reference = AssetReference.from_dict(init_parameters_data["model_service"])
model_service = ModelServiceFactory.load_from_reference(model_service_reference)
init_parameters_data["model_service"] = model_service
# Deserialize the streaming callbacks
serialized_callbacks = init_parameters_data.get("streaming_callbacks")
if serialized_callbacks and isinstance(serialized_callbacks, list):
data["init_parameters"]["streaming_callbacks"] = [
deserialize_callable(callback) for callback in serialized_callbacks
]
# Load the component
return default_from_dict(cls, {"type": data["type"], "init_parameters": init_parameters_data})
@component.output_types(replies=List[ChatMessage])
def run(
self,
messages: List[ChatMessage],
generation_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
):
"""
Generates responses for the given list of messages.
:param messages: The list of messages to generate responses for.
:param generation_kwargs: Additional keyword arguments to pass to the model for generation, defaults to None
:param kwargs: Additional keyword arguments to pass to the httpx client.
:return: A dictionary containing the generated responses.
"""
# update generation kwargs by merging with the generation kwargs passed to the run method
generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
completions = self.model_service.infer(
messages,
model=self.model,
timeout=self.timeout,
stream=self.stream,
streaming_callbacks=self.streaming_callbacks,
inference_kwargs=generation_kwargs,
**kwargs,
)
return {"replies": completions} |
Hey @vblagoje, can I ask for an update on the PR status? Trying to keep things moving while I am still happy to discuss more about the future deprecation of generators :) |
Hey @LastRemote in general your approach with generic ChatGenerator seems nice, there are many peculiarities with each chat generator in terms of parameters and how streaming is handled. We used this approach in Haystack 1.x for PromptNode and found that it turned into a maintenance nightmare. But YMMV and in your use case it might be a perfect solution. Regarding the PR - I was awaiting chat generator code, maybe we didn't understand each other? |
Oh, there is some misunderstanding here. I am still not exactly sure what you have mind after deprecating the current generators, and I am just offering my two cents here hoping it could be useful. Nonetheless, regarding the scope of this PR, I'd prefer that we purpose a smaller change in |
Yes, exactly - we do understand each other - let's implement TTFT in OpenAIChatGenerator 💪
Exactly, that's what I meant as well. |
Awesome, thanks! Any additional comments for the current code change? I described my intention of the current changes in a previous post. I will create a PR on the langfuse integration part as well in the integration repository.
|
@LastRemote thanks for your effort and patience! To clarify, let's focus on adding TTFT to OpenAIChatGenerator, not OpenAIGenerator, in this PR to keep it simple and aligned with our current architecture and objectives. Let me know if you have any questions! |
Oh. I did mistakenly update the wrong generator then. Thanks for pointing out, and I will make another update today. |
90fa937
to
4a47446
Compare
4a47446
to
020f799
Compare
Updated commit and cleaned up git commit history |
020f799
to
280cd2a
Compare
280cd2a
to
5485897
Compare
Hey @LastRemote have you checked if you get TTFT field in the Langfuse traces? |
@vblagoje Yes, it is working and I am using it in mmy applications right now. This does require some code changes in the Langfuse tracer part to track the TTFT information and include it in the tracing span. I will create a PR for that today as well. |
@vblagoje Here we go: deepset-ai/haystack-core-integrations#1161 Besides the latest merge caused a lint issue, but I am not sure if it is relevant to this PR since it doesn't change any parameters in OpenAI generator.
|
Thank you @LastRemote - I'll take a look soon and we can move this one forward! |
Great contrib @LastRemote - keep them coming 🙏 |
Related Issues
Proposed Changes:
How did you test it?
Notes for the reviewer
streaming_options.include_usage
in OpenAI SDK, and thus still no usage data for streaming LLMs.Checklist
fix:
,feat:
,build:
,chore:
,ci:
,docs:
,style:
,refactor:
,perf:
,test:
.